package com.ysy.lock;

import com.alibaba.fastjson.JSON;
import io.netty.util.internal.StringUtil;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @program: zookeeper-demo
 * @description: 分布式锁
 * @author: yeshiyuan
 * @create: 2019-06-13 15:15
 **/
public class DistributedLock implements Lock, Watcher{

    private ZooKeeper zk;

    private String root = "/locks"; //根
    private String lockName; //竞争资源的标志
    private String waitNode; //等待前一个节点
    private String myZnode; //当前锁
    private CountDownLatch latch; //计数器
    private int sessionTimeout = 30000; //30秒
    private List<Exception> exceptions = new ArrayList<Exception>();

    /**
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
     * @param host 127.0.0.1:2181
     * @param lockName 竞争资源标志,lockName中不能包含单词lock
     */
    public DistributedLock(String host, String lockName) {
        this.lockName = lockName;
        try {
            zk = new ZooKeeper(host, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if (stat == null) {
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            exceptions.add(e);
        } catch (InterruptedException e) {
            exceptions.add(e);
        } catch (KeeperException e) {
            exceptions.add(e);
        }
    }

    public void lock() {
        if(exceptions.size() > 0){
            throw new LockException(exceptions.get(0));
        }
        try {
            if (this.tryLock()) {
                System.out.println("Thread " + Thread.currentThread().getId() + " " + myZnode + " get lock true");
                return;
            } else {
                waitForLock(waitNode, sessionTimeout, TimeUnit.MILLISECONDS);//等待锁
            }
        } catch (InterruptedException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        }

    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if (lockName.contains(splitStr)) {
                throw new Exception("lockName can not contains '_lock_'");
            }
            if (StringUtil.isNullOrEmpty(myZnode)) {
                String path = root + "/" + lockName + splitStr;
                //创建有序的临时子节点，
                myZnode = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(myZnode + " is created ");
            }
            //取出所有子节点
            List<String> childrens = zk.getChildren(root, false);
            //取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : childrens) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁
                return true;
            }
            //如果不是最小的节点，找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (Exception e) {
            throw new LockException(e);
        }
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(waitNode, time, unit);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode, -1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    /**
      * @despriction：zookeeper节点的监视器
      * @author  yeshiyuan
      * @created 2019/6/13 17:19
      * @params [watchedEvent]
      * @return void
      */
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
            System.out.println("delete " + myZnode);
            if (this.latch != null) {
                this.latch.countDown();
            }
        }
    }

    private boolean waitForLock(String lower, long waitTime, TimeUnit unit) throws KeeperException, InterruptedException {
        String path = root + "/" + lower;
        Stat stat = zk.exists(path, true);
        if (stat != null) {
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + path);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, unit);
            this.latch = null;
            lock();
        }
        return true;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }


}
